# Building Blocks使用

此处使用JGroup 4.x版本,基于JDK 8

Building Blocks是针对JChannel的应用更上层的抽象。一般来说,JChannel作为发送信息(send、receive)的最小单元,而Building Blocks是基于JChannel提供更实用的功能。

下面以最基础的组员间消息分发为例,可以使用发送消息(消息分发)和RPC方式的方法调用两种。RPC方式是基于消息分发进行的扩展。

# 使用消息分发

消息分发使用MessageDispatcher,MessageDispatcher需要绑定JChannel和RequestHandler,前者用于发送消息,后者用于处理接收到的消息。

public class DispatchWithBuildingBlocksTest implements RequestHandler {

    public void startInMessage() throws Exception {

        // 创建两个JChannel,使用默认配置,并命名
        JChannel channelA = new JChannel().name("channelA");
        JChannel channelB = new JChannel().name("channelB");

        MessageDispatcher dispA = new MessageDispatcher(channelA, this);
        MessageDispatcher dispB = new MessageDispatcher(channelB, this);

        // 连接到同一个group中
        channelA.connect("test group");
        channelB.connect("test group");

        for(int i = 0; i < 10; i++){
            Util.sleep(100);
            System.out.println("Casting message #" + i);
            byte[] payload = ("number #" + i).getBytes();
            RspList list = dispA.castMessage(null, payload, 0, payload.length, RequestOptions.SYNC());
            System.out.println("Responses: \n" + list);
        }

        Util.close(dispA, dispB, channelA, channelB);
    }

    @Override
    public Object handle(Message message) throws Exception {
        System.out.println("handle():" + message);
        return "Success!";
    }
}

这里使用RspList list = dispA.castMessage(null, payload, 0, payload.length, RequestOptions.SYNC());命令发送消息,表示发送给所有组员,且使用同步方式。此处同步方式封装了实际的RequestOptions的创建方式,其实内部设置为等待所有请求返回才返回。

默认的实现是使用同步的方式。也可以改用MessageDispatcher.setAsynDispatching(true);来开启异步状态。

# 使用RPC调用

RPC调用使用反射的方式。这种方式更灵活,注册JChannel和需要调用的方法的对象(Rpc对象,但对象类型不需要做额外声明),底层采用反射的方式就可以针对对象进行方法调用。

public class DispatchWithBuildingBlocksTest{

    public void startInRpc() throws Exception{
        RequestOptions opts = new RequestOptions(ResponseMode.GET_ALL, 5000);

        JChannel channelA = new JChannel().name("channelA");
        JChannel channelB = new JChannel().name("channelB");

        MethodCall call = new MethodCall(getClass().getMethod("print", int.class));

        RpcDispatcher dispA = new RpcDispatcher(channelA, this);
        RpcDispatcher dispB = new RpcDispatcher(channelB, this);

        channelA.connect("test group");
        channelB.connect("test group");

        for(int i = 0; i < 10; i++){
            Util.sleep(100);
            call.setArgs(i);
            RspList rspList = dispA.callRemoteMethods(null, call, opts);
            System.out.println("Responses: " + rspList);
        }
        Util.close(dispA, dispB, channelA, channelB);

    }

    public int print(int number) throws Exception{
        return number * 2;
    }
}

这里也可以看到,RequestOptions对象的创建实际上是包含了ResponseMode,这里使用的GET_ALL含义是方法阻塞到获取所有的组员的response后,再返回。

# 用MethodLookUp接口提高反射性能

实际上频繁地调用Rpc的默认接口是有性能损耗的,原因在于每次定义了方法名,JGroups底层都是根据反射方式在注册的Rpc对象中去寻找方法,因此每次都是一次反射调用。JGroups的作者针对这种场景,优化为可以我们直接将反射调用的方法保存下来,每次调用的时候直接按照一个id值去快速找到指定的方法对象,省去了反射寻找的性能消耗。

public interface MethodLookup {
    Method findMethod(short var1);
}

MethodLookUp接口是在RcpDispatcher中含有的一个属性,但官方并没有实现,这其实是交给用户去自定义的接口。这里我自定义一个根据Hashmap来做的简单实现。

public class MethodMap implements MethodLookup {

    private Map<Short, Method> methodMap = new HashMap<>();

    public void put(short i, Method method) {
        this.methodMap.put(i, method);
    }

    @Override
    public Method findMethod(short i) {
        return this.methodMap.get(i);
    }
}

按照此接口实现的RPC调用:

// 定义需要rpc调用的方法
public class Function {
    private int t = 0;

    public void add(int i){
        t += i;
    }

    public int get(){
        return t;
    }
}

public class RpcAdvanced {


    public static void main(String[] args){
        try {
            new RpcAdvanced().methodLookup();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void methodLookup() throws Exception {
        RequestOptions opts = new RequestOptions(ResponseMode.GET_ALL, 5000);
        
        Function function = new Function();

        JChannel channelA = new JChannel().name("channelA");
        JChannel channelB = new JChannel().name("channelB");
        RpcDispatcher dispA = new RpcDispatcher(channelA, function);
        RpcDispatcher dispB = new RpcDispatcher(channelB, function);

        MethodMap methodMap = new MethodMap();

        methodMap.put((short) 0, function.getClass().getMethod("add", int.class));
        methodMap.put((short)1, function.getClass().getMethod("get"));
        dispA.setMethodLookup(methodMap);
        dispB.setMethodLookup(methodMap);
        channelA.connect("test group");
        channelB.connect("test group");

        MethodCall methodCall = new MethodCall((short)0, 2);

        dispA.callRemoteMethods(null, methodCall, opts);

        System.out.println(dispB.callRemoteMethods(null, new MethodCall((short)1), opts));

        Util.close(dispA, dispB, channelA, channelB);
    }
}

# 用MethodInvoker自定义方法调用

针对另一种更极端的场景,假设考虑到不允许使用反射的项目,直接使用反射可能会违反规定。这个时候JGroups开放了MethodInvoker接口。

public interface MethodInvoker {
    Object invoke(Object var1, short var2, Object[] var3) throws Exception;
}

以下是具体实现。

public class MyMethodInvoker implements MethodInvoker {
    @Override
    public Object invoke(Object o, short i, Object[] objects) throws Exception {
        // 假设这里传过来的都是Function
        Function f = (Function) o;
        switch (i){
            case 0:
                f.add((int)objects[0]);
                return null;
            case 1:
                return f.get();
            default:
                throw new UnsupportedOperationException();
        }
    }
}

public class RpcSelfDefined {
    public static void main(String[] args) throws Exception {
        new RpcSelfDefined().methodInvokeTest();
    }
    public void methodInvokeTest() throws Exception {
        RequestOptions opt = new RequestOptions(ResponseMode.GET_ALL, 5000);
        JChannel channelA = new JChannel().name("channelA");
        JChannel channelB = new JChannel().name("channelB");
        Function function = new Function();

        RpcDispatcher dispA = new RpcDispatcher(channelA, function);
        RpcDispatcher dispB = new RpcDispatcher(channelB, function);

        MethodInvoker methodInvoker = new MyMethodInvoker();

        dispA.setMethodInvoker(methodInvoker);
        dispB.setMethodInvoker(methodInvoker);

        MethodCall methodCall = new MethodCall((short)0, 1);

        dispA.callRemoteMethods(null, methodCall, opt);
        MethodCall methodCall1 = new MethodCall((short)1);

        System.out.println(dispB.callRemoteMethods(null, methodCall1, opt));
        Util.close(dispA, dispB, channelA, channelB);
    }
}